Spark流处理中定时更新广播变量值
在实际项目应用上,某些需求会有更新静态规则表的情况,如消息过滤规则、风控规则等。通常这样的表数据量不会大,在spark中使用广播变量的形式使用,而广播变量是不支持更新的,怎样在流处理过程中更新,下面分别论述Spark streaming和Structured streaming的场景。
一、Spark streaming
可以利用单例模式定时的删除已经广播的值,同时获取新的变量值重新广播,假如要广播的是RDS中的表,代码示例如下:
注意事项:
spark streaming会为每一个流创建job,为了不同job间互不影响,需在foreachRDD、transform算子内进行变量的广播操作此方法仅适用于spark streaming,structured streaming需使用其他方法做广播变量的更新
/**
* 定时更新广播变量,数据从jdbc数据源读取
*/
@Slf4j
public class JDBCBroadcastPeriodicUpdater {
private static final int PERIOD = 30 * 1000; //更新周期,秒
private static volatile JDBCBroadcastPeriodicUpdater instance;
private Broadcast broadcast;
private long lastUpdate = 0L;
private JDBCBroadcastPeriodicUpdater() {}
public static JDBCBroadcastPeriodicUpdater getInstance() {
if (instance == null) {
synchronized (JDBCBroadcastPeriodicUpdater.class) {
if (instance == null) {
instance = new JDBCBroadcastPeriodicUpdater();
}
}
}
return instance;
}
/**
* 更新广播变量
*/
public Broadcast updateAndGet(SparkSession spark, DruidDataSource dataSource, String sql) {
SparkContext sc = spark.sparkContext();
long now = System.currentTimeMillis();
long offset = now - lastUpdate;
if (offset > PERIOD || null == broadcast) {
if (broadcast != null) {
// 删除已获取的广播变量值
broadcast.unpersist();
}
lastUpdate = now;
// 重新广播新的变量值
List value = fetchBroadcastData(dataSource, sql);
broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value);
}
return broadcast;
}
/**
* 获取需要广播的数据
*/
@SneakyThrows
private List fetchBroadcastData(DruidDataSource dataSource, String sql) {
List result = new ArrayList();
DruidPooledConnection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql);
ResultSet data = ps.executeQuery();
ResultSetMetaData metaData = data.getMetaData();
int colCount = metaData.getColumnCount();
while (data.next()) {
HashMap row = new HashMap();
for (int i=0; i |